package com.fone.flumeExt.interceptor;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;

public  abstract class AvroHeaderInterceptor implements Interceptor {
	
	/** The Constant logger. */
	private static final Logger logger = LoggerFactory.getLogger(AvroHeaderInterceptor.class);
	
	/** The calendar. */
	private Calendar calendar;

	/**
	 * Instantiates a new default avro header interceptor.
	 *
	 * @param configuration the configuration
	 */
	protected AvroHeaderInterceptor() {
		calendar = Calendar.getInstance(TimeZone.getTimeZone("GMT+08:00"));
	}

	public  abstract String changeLogName(String name);
	/* (non-Javadoc)
	 * @see org.apache.flume.interceptor.Interceptor#initialize()
	 */
	@Override
	public void initialize() {
		// TODO Auto-generated method stub

	}

	/* (non-Javadoc)
	 * @see org.apache.flume.interceptor.Interceptor#intercept(org.apache.flume.Event)
	 */
	@Override
	public Event intercept(Event event) {
		if (event == null) {
			return null;
		}
		Map<String, String> eventHeaders = event.getHeaders();
		String logger_name = eventHeaders.get("flume.client.log4j.logger.name");
		//org.apache.flume.api.LoadBalancingRpcClient
		if(logger_name.trim().indexOf("org.apache.flume.api.")>=0){
			return null;
		}
		String tableName =changeLogName(logger_name);
		if(tableName==null || tableName.equals("")){
			tableName=logger_name;
		}
		eventHeaders.put("tablename", tableName);
		String time_name = eventHeaders.get("flume.client.log4j.timestamp");
		if (time_name != null && !time_name.trim().equals("") && time_name.replaceAll("\\d", "").trim().equals("")) {
			long v = Long.parseLong(time_name.trim());
			calendar.setTimeInMillis(v);
			int year = calendar.get(Calendar.YEAR);
			int month = calendar.get(Calendar.MONTH) + 1; // 获取月份，0表示1月份
			int day = calendar.get(Calendar.DAY_OF_MONTH); // 获取当前天数
			int hour = calendar.get(Calendar.HOUR_OF_DAY); // 获取当前小时
			int min = calendar.get(Calendar.MINUTE); // 获取当前分钟
			int ss = calendar.get(Calendar.SECOND); // 获取当前秒
			eventHeaders.put("fy", ""+year);
			if(month<10){
				eventHeaders.put("fm", "0"+month);
			}else{
				eventHeaders.put("fm", ""+month);
			}
			if(day<10){
				eventHeaders.put("fd", "0"+day);
			}else{
				eventHeaders.put("fd", ""+day);
			}
			if(hour<10){
				eventHeaders.put("fh", "0"+hour);
			}else{
				eventHeaders.put("fh", ""+hour);
			}
			if(min<10){
				eventHeaders.put("fmin", "0"+min);
			}else{
				eventHeaders.put("fmin", ""+min);
			}
			if(ss<10){
				eventHeaders.put("fs", "0"+ss);
			}else{
				eventHeaders.put("fs", ""+ss);
			}
		}else{
			logger.warn("no  timestamp!");
		}
		return event;
	}

	/* (non-Javadoc)
	 * @see org.apache.flume.interceptor.Interceptor#intercept(java.util.List)
	 */
	@Override
	public List<Event> intercept(List<Event> events) {
		List<Event> out = Lists.newArrayList();
		for (Event event : events) {
			Event outEvent = intercept(event);
			if (outEvent != null) {
				out.add(outEvent);
			}
		}
		return out;
	}

	/* (non-Javadoc)
	 * @see org.apache.flume.interceptor.Interceptor#close()
	 */
	@Override
	public void close() {
		// TODO Auto-generated method stub

	}
}
